package com.hulu.slow.kafka;

import com.alibaba.fastjson.JSONObject;
import com.codahale.metrics.Counter;
import com.google.common.util.concurrent.RateLimiter;
import com.sankuai.mms.cfg.Config;
import com.sankuai.xm.kafka.client.IConsumerProcessor;
import com.sankuai.xm.kafka.client.IMessageListener;
import com.sankuai.xm.kafka.client.factory.KafkaConsumerBuildFactory;
import com.sankuai.xm.kafka.client.utils.NamedThreadFactory;
import com.sankuai.xm.kafka.client.utils.StackTraceUtil;
import com.zs.pig.common.utils.JSONSerializerUtil;
import com.zs.pig.common.utils.JsonUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Read msg from Kafka
 */

public class KafkaConsumer {

    private static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);

    //失败的消息队列
    public static LinkedBlockingQueue<Object[]> businessQueue = new LinkedBlockingQueue<Object[]>(5000);

    /*默认消费频率限制*/
    private static final Integer DEFAULT_RATE_LIMITER = 10000;

   
   

    private IConsumerProcessor businessConsumer;
    private IConsumerProcessor redpacketConsumer;
    private IConsumerProcessor nyedConsumer;
    private IConsumerProcessor sendsmsConsumer;
    private IConsumerProcessor sendsmsConsumer2;

    private RateLimiter businessLimiter;
    private RateLimiter redpacketLimiter;
    private RateLimiter nyedLimiter;
    private RateLimiter sendsmsLimiter;

    
    private static ThreadPoolExecutor businesspool = (ThreadPoolExecutor) Executors.newFixedThreadPool(64);
    private static AtomicLong count = new AtomicLong();


    //更新长视频状态tomysql错误
 //   private static Counter updateLongVideoStatusError = MetricBeans.counter("business.longvideo.updateLongVideoStatus.error");
   // private static Counter retryError = MetricBeans.counter("business.retry.error");

    @PostConstruct
    private void init() {
        try {
            Config.reload();
            int rateLimiter = Integer.parseInt(Config.get("consumer.rate.limiter"));
            if (rateLimiter <= 0) {
                rateLimiter = DEFAULT_RATE_LIMITER;
            }

            if (businessLimiter == null) {
                businessLimiter = RateLimiter.create(rateLimiter);
            }
            if (redpacketLimiter == null) {
                redpacketLimiter = RateLimiter.create(rateLimiter);
            }
            if (nyedLimiter == null) {
                nyedLimiter = RateLimiter.create(rateLimiter);
            }
            if (sendsmsLimiter == null) {
                sendsmsLimiter = RateLimiter.create(rateLimiter);
            }

            businessConsumer = KafkaConsumerBuildFactory.init("business.kafka.topic", "business.group.id", "business.consumer.thread.num", "business.zookeeper.connect");
//            redpacketConsumer = KafkaConsumerBuildFactory.init("redpacket.kafka.topic", "redpacket.group.id", "redpacket.consumer.thread.num", "redpacket.zookeeper.connect");
//            nyedConsumer = KafkaConsumerBuildFactory.init("nyed.kafka.topic", "nyed.group.id", "nyed.consumer.thread.num", "nyed.zookeeper.connect");
//            sendsmsConsumer = KafkaConsumerBuildFactory.init("sendsms.kafka.topic", "sendsms.group.id", "sendsms.consumer.thread.num", "sendsms.zookeeper.connect");
//            sendsmsConsumer2 = KafkaConsumerBuildFactory.init("sendsms.kafka.topic", "sendsms2.group.id", "sendsms.consumer.thread.num", "sendsms.zookeeper.connect");
        } catch (Exception e) {
            log.error("KafkaConsumer init error.", e);
            System.exit(-1);
        }
    }

    public void start() {

    	 // 业务分类
        businessConsumer.recvMessageWithParallel(byte[].class, new IMessageListener() {
            @Override
            public void recvMessage(final byte[] bytes) {
              //  allcount.inc();
                businessLimiter.acquire();
                businesspool.execute(new Runnable() {
                    @Override
                    public void run() {

                        count.incrementAndGet();
                        if (count.get() % 1000 == 0) {
                            log.error("PoolSize={}, QueueSize={}, CompletedTaskCount={}",
                                    businesspool.getPoolSize(), businesspool.getQueue().size(),
                                    businesspool.getCompletedTaskCount());
                        }

                        String btype = null;
                        try {
                        	JSONObject jsonObject = JSONSerializerUtil.unserialize(bytes, JSONObject.class);
                            if (log.isInfoEnabled()) {
                                log.info("businessConsumer receive: {}", jsonObject);
                            }

                            btype = jsonObject.getString("btype");
                            JSONObject datajson = jsonObject.getJSONObject("data");
                            switch (btype) {
                                        case "test": {
                                        	System.out.println("test topic");
                                          //  wallettoalipayService.withdrawals(jsondata, maxRetry, count);
                                            break;
                                        }

                            }
                        } catch (Exception e) {

                            switch (btype) {
                                case "saveaddressbook":
                                  //  saveaddressbookError.inc();
                                    break;
                                

                            }
                            log.error("businessConsumer recvMessage error.", e);
                        }
                    }
                });
            }
        });

    }

    private static ScheduledExecutorService createScheduler(String poolname) {
        return Executors.newScheduledThreadPool(1, new NamedThreadFactory(poolname, true));
    }

}
